Skip to content

[RayJob] Wait for workers before submitted jobs in sidecar mode#4429

Open
marosset wants to merge 3 commits intoray-project:masterfrom
marosset:sidecar-submitter-not-waiting-fix
Open

[RayJob] Wait for workers before submitted jobs in sidecar mode#4429
marosset wants to merge 3 commits intoray-project:masterfrom
marosset:sidecar-submitter-not-waiting-fix

Conversation

@marosset
Copy link
Contributor

Why are these changes needed?

In SideCarMode, the submitted can submit jobs before worker are connected to the Ray cluster causing jobs to run on the head node.

This fix adds a wait loop that polls the Ray Dashboard API to verify that minReplicas workers are joined to the cluster before submitting the job. The minimum expected worker count is retrieved from the spec and passed via RAY_EXPECTED_MIN_WORKERS env var

Related issue number

Fixes #4199

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@marosset marosset force-pushed the sidecar-submitter-not-waiting-fix branch 2 times, most recently from 2600631 to 4d404a3 Compare January 22, 2026 00:30
@marosset marosset force-pushed the sidecar-submitter-not-waiting-fix branch from 4d404a3 to a665203 Compare January 27, 2026 00:01
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, can you help me merge master?

@marosset marosset force-pushed the sidecar-submitter-not-waiting-fix branch from a665203 to 30bfb39 Compare January 27, 2026 19:06
@marosset
Copy link
Contributor Author

Hi, can you help me merge master?

rebased - thanks!

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Future-Outlier Future-Outlier changed the title Wait for workers before submitted jobs in sidecar mode [RayJob] Wait for workers before submitted jobs in sidecar mode Jan 28, 2026
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high level looks good
image

Copy link
Member

@win5923 win5923 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sorry, just find some error need to be fixed.

@win5923 win5923 self-requested a review January 28, 2026 17:31
@marosset marosset force-pushed the sidecar-submitter-not-waiting-fix branch from 30bfb39 to a995d8b Compare January 28, 2026 23:07
Signed-off-by: Mark Rossett <marosset@microsoft.com>
@marosset marosset force-pushed the sidecar-submitter-not-waiting-fix branch from a995d8b to 05c0fec Compare January 28, 2026 23:14
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. if you want the job runs in Ray cluster's worker node, I think you can just set rayStartParams's num-cpus: "0" like this, (you can try this using master branch)
  rayClusterSpec:
    rayVersion: '2.52.0'
    headGroupSpec:
      rayStartParams:
        num-cpus: "0"
  1. I think make sidecar mode's behavior similar to k8s job mode is a more correct behavior, so I'm willing to accept this PR.

@Future-Outlier Future-Outlier dismissed their stale review January 29, 2026 07:28

correct behavior

@CheyuWu CheyuWu self-requested a review January 29, 2026 11:47
Copy link
Member

@win5923 win5923 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

Also linking #3837 since we’re planning to replace wget with another alternative tool.


// In SidecarMode, pass the expected minimum worker count so the submitter can wait for workers to register
if submissionMode == rayv1.SidecarMode && rayJobInstance.Spec.RayClusterSpec != nil {
minWorkers := common.GetMinReplicasFromSpec(rayJobInstance.Spec.RayClusterSpec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minReplicas can be 0 or a low number, I wonder if we should only check min replicas when autoscaling is enabled. If autoscaling is not enabled, we should just check replicas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change this and push as a separate commit.
I'll defer to others for what the best number of workers to wait/check for is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use this function.

// CalculateDesiredReplicas calculate desired worker replicas at the cluster level
func CalculateDesiredReplicas(ctx context.Context, cluster *rayv1.RayCluster) int32 {
count := int32(0)
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
count += GetWorkerGroupDesiredReplicas(ctx, nodeGroup)
}
return count
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since RayJob in Kubernetes Job mode waits for the RayCluster to reach the Ready state, and the RayCluster’s Ready state requires all pods to be running, including both the head pod and worker pods. The number of worker pods is determined by CalculateDesiredReplicas.

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SideCarMode, the submitted can submit jobs before worker are connected to the Ray cluster causing jobs to run on the head node.

if the PR's goal is to wait for worker started, you can use config like this, then ray's scheduler will not schedule jobs on the head node.

  rayClusterSpec:
    rayVersion: '2.52.0'
    headGroupSpec:
      rayStartParams:
        num-cpus: "0"

Comment on lines 84 to 107
// GetMinReplicasFromSpec calculates the minimum expected worker replicas from the RayClusterSpec.
// This is used in SidecarMode to determine how many workers should be registered before submitting the job.
func GetMinReplicasFromSpec(rayClusterSpec *rayv1.RayClusterSpec) int32 {
if rayClusterSpec == nil {
return 0
}
count := int32(0)
for _, nodeGroup := range rayClusterSpec.WorkerGroupSpecs {
if nodeGroup.Suspend != nil && *nodeGroup.Suspend {
continue
}
minReplicas := int32(0)
if nodeGroup.MinReplicas != nil && *nodeGroup.MinReplicas > 0 {
minReplicas = *nodeGroup.MinReplicas
} else if nodeGroup.Replicas != nil && *nodeGroup.Replicas > 0 {
// Fall back to Replicas when MinReplicas is not set or is 0.
// This handles static clusters where users only set Replicas.
minReplicas = *nodeGroup.Replicas
}
count += minReplicas * nodeGroup.NumOfHosts
}
return count
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the function CalculateDesiredReplicas?

// CalculateDesiredReplicas calculate desired worker replicas at the cluster level
func CalculateDesiredReplicas(ctx context.Context, cluster *rayv1.RayCluster) int32 {
count := int32(0)
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
count += GetWorkerGroupDesiredReplicas(ctx, nodeGroup)
}
return count
}

Comment on lines 167 to 192
// Wait for the expected number of worker nodes to register for the Ray cluster.
// RAY_EXPECTED_MIN_WORKERS is set by the controller based on the MinReplicas in the RayClusterSpec.
// The loop queries the Ray Dashboard API to get the number of alive nodes and
// continues until the number of alive nodes is equal to (expected_workers + 1) for head node.
// This ensures that worker pods are connected before the job is submitted otherwise
// the jobs may run on the Head node.
//
// Note: This loop will never timeout and will wait indefinitely if workers never register.
// This can be mitigated by setting the RayJob's `activeDeadlineSeconds` field
// to enforce a maximum job execution time.
//
// The wget command includes the x-ray-authorization header if RAY_AUTH_TOKEN is set.
// This is required when Ray auth token mode is enabled, otherwise the request will fail with 401.
wgetAuthHeader := "${" + utils.RAY_AUTH_TOKEN_ENV_VAR + ":+--header \"x-ray-authorization: Bearer $" + utils.RAY_AUTH_TOKEN_ENV_VAR + "\"}"
waitForNodesLoop := []string{
"if", "[", "-n", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "]", "&&", "[", "\"$" + utils.RAY_EXPECTED_MIN_WORKERS + "\"", "-gt", "\"0\"", "]", ";", "then",
"EXPECTED_NODES=$(($" + utils.RAY_EXPECTED_MIN_WORKERS + " + 1))", ";",
"echo", strconv.Quote("Waiting for $EXPECTED_NODES nodes (1 head + $" + utils.RAY_EXPECTED_MIN_WORKERS + " workers) to register..."), ";",
"until", "[",
"\"$(wget " + wgetAuthHeader + " -q -O- " + address + "/nodes?view=summary 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(len([n for n in d.get('data',{}).get('summary',[]) if n.get('raylet',{}).get('state','')=='ALIVE']))\" 2>/dev/null || echo 0)\"",
"-ge", "\"$EXPECTED_NODES\"", "]", ";",
"do", "echo", strconv.Quote("Waiting for Ray nodes to register. Expected: $EXPECTED_NODES ..."), ";", "sleep", "2", ";", "done", ";",
"echo", strconv.Quote("All expected nodes are registered."), ";",
"fi", ";",
}
cmd = append(cmd, waitForNodesLoop...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use something like ray list nodes using ray's CLI?
we are removing wget

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Future-Outlier - I pushed a commit that uses python's urllib instead of wget.
Is this OK instead of ray list nodes?

@marosset
Copy link
Contributor Author

marosset commented Feb 2, 2026

In SideCarMode, the submitted can submit jobs before worker are connected to the Ray cluster causing jobs to run on the head node.

if the PR's goal is to wait for worker started, you can use config like this, then ray's scheduler will not schedule jobs on the head node.

  rayClusterSpec:
    rayVersion: '2.52.0'
    headGroupSpec:
      rayStartParams:
        num-cpus: "0"

This PR is to address #4199 which does look like the primary issue is make sure the jobs do not get scheduled to the head node.

@Future-Outlier - Let me know if we you think it would still be worthwhile to iterate on this PR or if simply specifying the headGroupSpec.rayStateParams.num-cpus: "0" is sufficient.

@Future-Outlier
Copy link
Member

Hi, @marosset it's great to accept this PR, since we want to make k8s job mode and sidecar mode as similar as possible.

@marosset
Copy link
Contributor Author

marosset commented Feb 3, 2026

Hi, @marosset it's great to accept this PR, since we want to make k8s job mode and sidecar mode as similar as possible.

Great, I'll address the above feedback then!

Signed-off-by: Mark Rossett <marosset@microsoft.com>
…ecar sumitter jobs

Signed-off-by: Mark Rossett <marosset@microsoft.com>
@Future-Outlier
Copy link
Member

Hi, @marosset do you mind fix CI's err?

Copilot AI mentioned this pull request Feb 5, 2026
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] SidecarMode: submitter not wait RayCluster Running (Ready)

4 participants